package cn.quantgroup.rabbitmq.workqueue;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 消息如果丢失,未处理完成会重新发送给其他消费者
 * Created by 11 on 2016/12/13.
 */
public class RequeueWorker {
    private static final String QUEUE_NAME = "worker_queue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        //声明队列,消息接受者再次声明队列.确保消息队列存在,队列的创建是幂等的，存在的队列不会再次创建
        //开启消息持久化默认为true，此处还未用到消息持久化,所以队列声明时第二个参数直接设置为false，
        //当需要持久化时请打开该行代码的注释，并将队列声明的第二个参数设置为durable即可。
        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //使用信道创建消息消费者
        /**
         * 设置通道同时处理的最大的消息数量,此处设置为1,那么当该消费者有未完成的任务时,而且没有其他空闲消费者
         * 那么消息会堆积在队列中等待处理
         */
        channel.basicQos(1);
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    doWork(message);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    System.out.println(" [x] Done");
                    //消息处理完成后程序显示调用方法来确认给消息发送者.
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        /**
         * 设置自动确认为false.为false时rabbitmq不会自动确认消息的消费(在发送方未收到消费者
         * 确认之前不会删除队列中的消息).以此来保证消息不丢失,即使消费者被杀死或断开连接
         * 启动两个RequeueWorker，发送一个耗时5秒的任务,在任务执行期间断开正在执行任务的消费者连接
         * 会发现正在处理的消息被转发给另一个空闲的消费者处理了
         */
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

    /**
     * 判断消息中.的数量，每个.休眠一秒，模拟耗时的任务
     * @param task
     * @throws InterruptedException
     */
    public static void doWork(String task) throws InterruptedException{
        for(char ch : task.toCharArray()){
            if(ch == '.'){
                Thread.sleep(1000);
            }
        }
    }
}
